gRPC 元数据

元数据

元数据(metadata)是指在处理RPC请求和响应过程中需要但又不属于具体业务(例如身份验证详细信息)的信息,采用键值对列表的形式,其中键是string类型,值通常是[]string类型,但也可以是二进制数据。gRPC中的 metadata 类似于我们在 HTTP headers中的键值对,元数据可以包含认证token、请求标识和监控标签等。

metadata中的键是大小写不敏感的,由字母、数字和特殊字符-_.组成并且不能以grpc-开头(gRPC保留自用),二进制值的键名必须以-bin结尾。在Go语言中我们是用 google.golang.org/grpc/metadata 这个库来操作metadata。

metadata 类型定义如下:这个 map 的值类型是[]string,因此用户可以使用一个键附加多个值。

1
type MD map[string][]string

创建 metadata

方法1:使用 metadata.New 创建(建议使用)

1
md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})

方法2:使用 metadata.Pairs 合并到一个列表中

1
2
3
4
5
6
md := metadata.Pairs(
"key1", "val1",
"key1", "val1-2", // "key1"的值将会是 []string{"val1", "val1-2"}
"key2", "val2",
"key-bin", string([]byte{96, 102}), // 二进制数据在发送前会进行(base64) 编码,收到后会进行解码
)

注意,所有的键将自动转换为小写。

客户端

发送元数据

元数据的传递,是放在 ctx 中进行的,客户端可以通过 AppendToOutgoingContext(追加的方式) 将元数据附加到 ctx 中,也可以使用 NewOutgoingContext (如果 ctx 中已经存在,则会替换)来进行元数据的添加。

1
2
3
4
5
6
7
8
9
10
11
// 创建带有metadata的context
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")

// 添加一些 metadata 到 context (e.g. in an interceptor)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")

// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)

// 或者发起流式RPC请求
stream, err := client.SomeStreamingRPC(ctx)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建带有metadata的context
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 添加一些metadata到context (e.g. in an interceptor)
send, _ := metadata.FromOutgoingContext(ctx)
newMD := metadata.Pairs("k3", "v3")
ctx = metadata.NewOutgoingContext(ctx, metadata.Join(send, newMD))

// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)

// 或者发起流式RPC请求
stream, err := client.SomeStreamingRPC(ctx)

接收元数据

客户端可以接收的元数据包括 header 和 trailer。header在数据之前,trailer在数据之后。

普通 RPC:

1
2
3
4
5
6
7
8
9
var header, trailer metadata.MD // 声明存储header和trailer的变量
r, err := client.SomeRPC(
ctx,
someRequest,
grpc.Header(&header), // 将会接收header
grpc.Trailer(&trailer), // 将会接收trailer
)

// do something with header and trailer

流式 RPC:

1
2
3
4
5
6
7
stream, err := client.SomeStreamingRPC(ctx)

// 接收 header
header, err := stream.Header()

// 接收 trailer
trailer := stream.Trailer()

服务端

发送元数据

普通 RPC:

1
2
3
4
5
6
7
8
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 创建和发送 header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// 创建和发送 trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}

流式RPC:

1
2
3
4
5
6
7
8
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// 创建和发送 header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// 创建和发送 trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}

接收元数据

普通 RPC:

1
2
3
4
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}

流式RPC:

1
2
3
4
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}

普通 RPC 传递元数据示例

完整代码可参考:https://github.com/rexyan/Go-Microservice/tree/main/metadata

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"client/pb"
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"time"
)

func main() {
conn, err := grpc.Dial("127.0.0.1:9999", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
c := pb.NewGreeterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// 设置元数据,注意这里传送到服务端后 key 变成了小写
ctx = metadata.AppendToOutgoingContext(ctx, "Authorization", "Token")

// 调用方法并接收服务端返回的元数据
var header, trailer metadata.MD
response, err := c.SayHello(ctx, &pb.HelloRequest{
Name: "zh",
}, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
fmt.Printf("Call SayHello Error: %v\n", err.Error())
}

fmt.Println(response.Reply)
// 元数据
fmt.Println(header)
fmt.Println(trailer)
/**
hello: zh
map[content-type:[application/grpc] timestamp:[1679579992315]]
map[location:[ShangHai]]
*/
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"net"
"server/pb"
"strconv"
"time"
)

type server struct {
pb.UnimplementedGreeterServer
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
// 设置 元数据 trailer
defer func() {
md := metadata.New(map[string]string{"Location": "ShangHai"})
err := grpc.SetTrailer(ctx, md)
if err != nil {
return
}
}()

// 获取元数据信息
md, ok := metadata.FromIncomingContext(ctx)
if ok {
fmt.Printf("metadata %v\n", md)
// 这里要注意,所有的 key 都变为了小写
Authorization := md.Get("authorization")
if len(Authorization) > 0 {
if Authorization[0] != "Token" {
return nil, status.Error(codes.Unauthenticated, "Authorization Error!")
}
} else {
return nil, status.Error(codes.Unauthenticated, "Authorization Metadata Not Found!")
}
} else {
return nil, status.Error(codes.Unauthenticated, "Metadata Not Found!")
}

// 设置元数据信息
// 设置 元数据 header
headerMd := metadata.New(map[string]string{"Timestamp": strconv.Itoa(int(time.Now().UnixMilli()))})
err := grpc.SetHeader(ctx, headerMd)
if err != nil {
return nil, err
}

// 返回数据
return &pb.HelloResponse{Reply: "hello: " + in.Name}, nil
}

func main() {
listen, err := net.Listen("tcp", ":9999")
if err != nil {
return
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
err = s.Serve(listen)
if err != nil {
return
}
}